package com.zk.config;

import com.zk.properties.ZookeeperProperties;
import com.zk.service.SRLockDealCallback;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * zookeeper客户端
 */
@Data
@Slf4j
public class ZkClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    public TreeCache cache;
    public NodeCache nodeCache;
    public PathChildrenCache pathChildrenCache;
    private ZookeeperProperties zookeeperProperties;

    public ZkClient(ZookeeperProperties zookeeperProperties){
        this.zookeeperProperties = zookeeperProperties;
    }

    /**
     * 初始化zookeeper客户端
     * connectionString zk地址
     * sessionTimeoutMs 会话超时时间
     * connectionTimeoutMs 连接超时时间
     * namespace 每个curatorFramework 可以设置一个独立的命名空间,之后操作都是基于该命名空间，比如操作 /app1/message 其实操作的是/node1/app1/message
     * retryPolicy 重试策略
     */
    public void init() {
        try{
            //ExponentialBackoffRetry的重试策略
            //baseSleepTimeMs初始sleep时间
            //maxRetries最多重试几次
            //public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),
                    zookeeperProperties.getMaxRetries());
            Builder builder   = CuratorFrameworkFactory.builder()
                    .connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)
                    .sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())
                    .connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())
                    .namespace( zookeeperProperties.getNamespace());
            if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){
                builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));
                builder.aclProvider(new ACLProvider() {
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }

                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
            }
            client = builder.build();


            setTreeCacheListener("/test");

            setPathCacheListener("/test", true);

            setNodeCacheListener("/test", true);

            //   addConnectionStateListener();


            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        //连接丢失
                        logger.info("lost session with zookeeper");
                    } else if (state == ConnectionState.CONNECTED) {
                        //连接新建
                        logger.info("connected with zookeeper");
                    } else if (state == ConnectionState.RECONNECTED) {
                        logger.info("reconnected with zookeeper");
                    }
                }
            });

            client.start();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    /*
     *  设置Path Cache, 监控本节点的子节点被创建,更新或者删除，注意是子节点, 子节点下的子节点不能递归监控
     *  事件类型有3个, 可以根据不同的动作触发不同的动作
     *  本例子只是演示, 所以只是打印了状态改变的信息, 并没有在PathChildrenCacheListener中实现复杂的逻辑
     *  @Param path 监控的节点路径, cacheData 是否缓存data
     *  可重入监听
     * */
    public void setPathCacheListener(String path, boolean cacheData) {
        try {
            pathChildrenCache = new PathChildrenCache(client, path, cacheData);
            PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
                    ChildData data = event.getData();
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            logger.info("子节点增加, path={}, data={}", data.getPath(), data.getData());
                            break;
                        case CHILD_UPDATED:
                            logger.info("子节点更新, path={}, data={}", data.getPath(), data.getData());
                            break;
                        case CHILD_REMOVED:
                            logger.info("子节点删除, path={}, data={}", data.getPath(), data.getData());
                            break;
                        default:
                            break;
                    }
                }
            };
            pathChildrenCache.getListenable().addListener(childrenCacheListener);
            pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        } catch (Exception e) {
            logger.error("PathCache监听失败, path=", path);
        }

    }

    /*
     *  设置Node Cache, 监控本节点的新增,删除,更新
     *  节点的update可以监控到, 如果删除会自动再次创建空节点
     *  本例子只是演示, 所以只是打印了状态改变的信息, 并没有在NodeCacheListener中实现复杂的逻辑
     *  @Param path 监控的节点路径, dataIsCompressed 数据是否压缩
     *  不可重入监听
     * */
    public void setNodeCacheListener(String path, boolean dataIsCompressed) {
        try {
            nodeCache = new NodeCache(client, path, dataIsCompressed);
            NodeCacheListener nodeCacheListener = new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    ChildData childData = nodeCache.getCurrentData();
                    logger.info("ZNode节点状态改变, path={}", childData.getPath());
                    logger.info("ZNode节点状态改变, data={}", childData.getData());
                    logger.info("ZNode节点状态改变, stat={}", childData.getStat());
                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
        } catch (Exception e) {
            logger.error("创建NodeCache监听失败, path={}", path);
        }
    }


    /**
     *  设置Tree Cache, 监控本节点的新增,删除,更新
     *  节点的update可以监控到, 如果删除不会自动再次创建
     *  本例子只是演示, 所以只是打印了状态改变的信息, 并没有在NodeCacheListener中实现复杂的逻辑
     * @param watchRootPath
     * @throws Exception
     */
    private void setTreeCacheListener(String watchRootPath) throws Exception {
        cache = new TreeCache(client, watchRootPath);
        TreeCacheListener listener = new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if(data != null){
                    switch (event.getType()) {
                        case NODE_ADDED:
                            logger.info("[TreeCache]节点增加, path={}, data={}", data.getPath(), new String(data.getData(),"utf-8"));
                            break;
                        case NODE_UPDATED:
                            logger.info("[TreeCache]节点更新, path={}, data={}", data.getPath(), new String(data.getData(),"utf-8"));
                            break;
                        case NODE_REMOVED:
                            logger.info("[TreeCache]节点删除, path={}, data={}", data.getPath(), new String(data.getData(),"utf-8"));
                            break;
                        default:
                            break;
                    }
                }else{
                    logger.info("[TreeCache]节点数据为空");
                }
            }
        };

        cache.getListenable().addListener(listener);
        cache.start();
    }


    public void stop() {
        client.close();
    }

    public CuratorFramework getClient() {
        return client;
    }


    /**
     * 创建节点
     * @param mode       节点类型
     *                   1、PERSISTENT 持久化目录节点，存储的数据不会丢失。
     *                   2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点，存储的数据不会丢失
     *                   3、EPHEMERAL临时目录节点，一旦创建这个节点的客户端与服务器端口也就是session 超时，这种节点会被自动删除
     *                   4、EPHEMERAL_SEQUENTIAL临时自动编号节点，一旦创建这个节点的客户端与服务器端口也就是session 超时，这种节点会被自动删除，并且根据当前已近存在的节点数自动加 1，然后返回给客户端已经成功创建的目录节点名。
     * @param path  节点名称
     * @param nodeData  节点数据
     */
    public void createNode(CreateMode mode, String path , String nodeData) {
        try {
            //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
            client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
    }

    /**
     * 创建节点
     * @param mode       节点类型
     *                   1、PERSISTENT 持久化目录节点，存储的数据不会丢失。
     *                   2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点，存储的数据不会丢失
     *                   3、EPHEMERAL临时目录节点，一旦创建这个节点的客户端与服务器端口也就是session 超时，这种节点会被自动删除
     *                   4、EPHEMERAL_SEQUENTIAL临时自动编号节点，一旦创建这个节点的客户端与服务器端口也就是session 超时，这种节点会被自动删除，并且根据当前已近存在的节点数自动加 1，然后返回给客户端已经成功创建的目录节点名。
     * @param path  节点名称
     */
    public void createNode(CreateMode mode,String path ) {
        try {
            //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
            client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
    }

    /**
     * 删除节点数据
     *
     * @param path
     */
    public void deleteNode(final String path) {
        try {
            deleteNode(path,true);
        } catch (Exception ex) {
            log.error("{}",ex);
        }
    }


    /**
     * 删除节点数据
     * @param path
     * @param deleteChildre   是否删除子节点
     */
    public void deleteNode(final String path,Boolean deleteChildre){
        try {
            if(deleteChildre){
                //guaranteed()删除一个节点，强制保证删除,
                // 只要客户端会话有效，那么Curator会在后台持续进行删除操作，直到删除节点成功
                client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
            }else{
                client.delete().guaranteed().forPath(path);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 设置指定节点的数据
     * @param path
     * @param datas
     */
    public void setNodeData(String path, byte[] datas){
        try {
            client.setData().forPath(path, datas);
        }catch (Exception ex) {
            log.error("{}",ex);
        }
    }

    /**
     * 获取指定节点的数据
     * @param path
     * @return
     */
    public byte[] getNodeData(String path){
        Byte[] bytes = null;
        try {
            if(cache != null){
                ChildData data = cache.getCurrentData(path);
                if(data != null){
                    return data.getData();
                }
            }
            client.getData().forPath(path);
            return client.getData().forPath(path);
        }catch (Exception ex) {
            log.error("{}",ex);
        }
        return null;
    }

    /**
     * 获取数据时先同步
     * @param path
     * @return
     */
    public byte[] synNodeData(String path){
        client.sync();
        return getNodeData( path);
    }

    /**
     * 判断路径是否存在
     *
     * @param path
     * @return
     */
    public boolean isExistNode(final String path) {
        client.sync();
        try {
            return null != client.checkExists().forPath(path);
        } catch (Exception ex) {
            return false;
        }
    }


    /**
     * 获取节点的子节点
     * @param path
     * @return
     */
    public List<String> getChildren(String path) {
        List<String> childrenList = new ArrayList<>();
        try {
            childrenList = client.getChildren().forPath(path);
        } catch (Exception e) {
            logger.error("获取子节点出错", e);
        }
        return childrenList;
    }

    /**
     * 随机读取一个path子路径, "/"为根节点对应该namespace
     * 先从cache中读取，如果没有，再从zookeeper中查询
     * @param path
     * @return
     * @throws Exception
     */
    public String getRandomData(String path)  {
        try{
            Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);
            if(cacheMap != null && cacheMap.size() > 0) {
                logger.debug("get random value from cache,path="+path);
                Collection<ChildData> values = cacheMap.values();
                List<ChildData> list = new ArrayList<>(values);
                Random rand = new Random();
                byte[] b = list.get(rand.nextInt(list.size())).getData();
                return new String(b,"utf-8");
            }
            if(isExistNode(path)) {
                logger.debug("path [{}] is not exists,return null",path);
                return null;
            } else {
                logger.debug("read random from zookeeper,path="+path);
                List<String> list = client.getChildren().forPath(path);
                if(list == null || list.size() == 0) {
                    logger.debug("path [{}] has no children return null",path);
                    return null;
                }
                Random rand = new Random();
                String child = list.get(rand.nextInt(list.size()));
                path = path + "/" + child;
                byte[] b = client.getData().forPath(path);
                String value = new String(b,"utf-8");
                return value;
            }
        }catch(Exception e){
            log.error("{}",e);
        }
        return null;

    }

    /**
     * 可重入共享锁  -- Shared Reentrant Lock
     * @param lockPath
     * @param time
     * @param dealWork 获取
     * @return
     */
    public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        try { //InterProcessMutex基于Zookeeper实现了分布式的公平可重入互斥锁，类似于单个JVM进程内的ReentrantLock(fair=true)
            if (!lock.acquire(time, TimeUnit.SECONDS)) {
                log.error("get lock fail:{}", " could not acquire the lock");
                return null;
            }
            log.debug("{} get the lock",lockPath);
            Object b = dealWork.deal();
            return b;
        }catch(Exception e){
            log.error("{}", e);
        }finally{
            try {
                lock.release();
            } catch (Exception e) {
                //log.error("{}",e);
            }
        }
        return null;
    }

    /**
     * 获取读写锁
     * @param path
     * @return
     */
    public InterProcessReadWriteLock getReadWriteLock(String path){
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
        return readWriteLock;
    }

    /**
     * 在注册监听器的时候，如果传入此参数，当事件触发时，逻辑由线程池处理
     */
    ExecutorService pool = Executors.newFixedThreadPool(2);

    /**
     * 监听数据节点的变化情况
     * @param watchPath
     * @param listener
     */
    public void watchPath(String watchPath,TreeCacheListener listener){
        //   NodeCache nodeCache = new NodeCache(client, watchPath, false);
        TreeCache cache = new TreeCache(client, watchPath);
        cache.getListenable().addListener(listener,pool);
        try {
            cache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}